#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2015-2018 Linaro Limited
#
# Author: Remi Duraffort <remi.duraffort@linaro.org>
#
# This file is part of LAVA Dispatcher.
#
# LAVA Dispatcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# LAVA Dispatcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses>.

import argparse
import contextlib
import errno
import fcntl
import glob
import logging
import logging.handlers
import lzma
import os
import re
import shutil
import signal
import sqlite3
import socket
import subprocess
import sys
import time
import traceback
import zmq
import zmq.auth
from urllib.parse import urlparse
from zmq.utils.strtypes import b, u

from lava_common.compat import yaml_safe_load
from lava_common.constants import DISPATCHER_DOWNLOAD_DIR
from lava_common.version import __version__
from lava_dispatcher.job import ZMQConfig

# pylint: disable=no-member
# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments
# pylint: disable=too-many-locals


###########
# Constants
###########
FINISH_MAX_DURATION = 120
JOBS_CHECK_INTERVAL = 5
PROTOCOL_VERSION = 3
SEND_QUEUE = 10  # zmq high water mark
TIMEOUT = 5  # zmq timeout
SLAVE_DIR = "/var/lib/lava/dispatcher/slave"

#########
# Globals
#########
tmp_dir = os.path.join(SLAVE_DIR, "tmp")


# Create the logger that will be configured later
logging.Formatter.convert = time.gmtime
LOG = logging.getLogger("lava-slave")
FORMAT = "%(asctime)-15s %(levelname)7s %(message)s"

# Check for stale resources every STALE_CHECK_INTERVAL seconds (default to 1 hour)
STALE_CHECK_INTERVAL = 3600
STALE_CONFIG = {
    DISPATCHER_DOWNLOAD_DIR: {"pattern": "{prefix}{job_id}"},
    tmp_dir: {"pattern": "{prefix}{job_id}"},
}

debug = False


#########
# Helpers
#########


def create_environ(env):
    """
    Generate the env variables for the job.
    """
    conf = yaml_safe_load(env) if env else None
    environ = dict(os.environ)
    if conf:
        if conf.get("purge", False):
            environ = {}
        # Remove some variables (that might not exist)
        for var in conf.get("removes", {}):
            with contextlib.suppress(KeyError):
                del environ[var]
        # Override
        environ.update(conf.get("overrides", {}))
    return environ


def get_fqdn():
    """
    Return the fully qualified domain name.
    """
    host = socket.getfqdn()
    if bool(re.match("[-_a-zA-Z0-9.]+$", host)):
        return host
    else:
        raise ValueError("Your FQDN contains invalid characters")


def mkdir(path):
    """Create a directory only if needed."""
    try:
        os.makedirs(path, mode=0o755)
    except OSError as exc:
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else:
            raise


def get_prefix(cfg):
    if isinstance(cfg, dict):
        return cfg.get("prefix", "")
    return ""


def send_multipart_u(sock, data):
    """
    Wrapper around send_multipart that encode data as bytes.

    :param sock: The socket to use
    :param data: Data to convert to byte strings
    """
    return sock.send_multipart([b"master"] + [b(d) for d in data])


def start_job(
    job_id,
    definition,
    device_definition,
    zmq_config,
    dispatcher_config,
    env_str,
    env_dut_str,
):
    """
    Start the lava-run process and return the pid
    """
    # Create the base directory
    dispatcher_cfg = yaml_safe_load(dispatcher_config)
    base_dir = os.path.join(
        tmp_dir,
        "{prefix}{job_id}".format(
            prefix=get_prefix(dispatcher_cfg), job_id=str(job_id)
        ),
    )
    mkdir(base_dir)

    # Write back the job, device and dispatcher configuration
    with open(os.path.join(base_dir, "job.yaml"), "w") as f_job:
        f_job.write(definition)
    with open(os.path.join(base_dir, "device.yaml"), "w") as f_device:
        f_device.write(device_definition)
    with open(os.path.join(base_dir, "dispatcher.yaml"), "w") as f_job:
        f_job.write(dispatcher_config)

    # Dump the environment variables in the tmp file.
    if env_dut_str:
        with open(os.path.join(base_dir, "env.dut.yaml"), "w") as f_env:
            f_env.write(env_dut_str)

    try:
        if debug:
            out_file = sys.stdout
            err_file = sys.stderr
        else:
            out_file = open(os.path.join(base_dir, "stdout"), "w")
            err_file = open(os.path.join(base_dir, "stderr"), "w")
        env = create_environ(env_str)
        lava_run = os.path.join(os.path.dirname(__file__), "lava-run")
        args = [
            # Run lava-run under nice so every sub-commands will be niced
            "nice",
            lava_run,
            "--device=%s" % os.path.join(base_dir, "device.yaml"),
            "--dispatcher=%s" % os.path.join(base_dir, "dispatcher.yaml"),
            "--output-dir=%s" % base_dir,
            "--job-id=%s" % job_id,
            "--logging-url=%s" % zmq_config.logging_url,
        ]
        if debug:
            args.append("--debug")
        args.append(os.path.join(base_dir, "job.yaml"))

        if zmq_config.socks_proxy:
            args.extend(["--socks-proxy", zmq_config.socks_proxy])

        if zmq_config.ipv6:
            args.append("--ipv6")

        # Use certificates if defined
        if zmq_config.master_cert is not None and zmq_config.slave_cert is not None:
            args.extend(
                [
                    "--master-cert",
                    zmq_config.master_cert,
                    "--slave-cert",
                    zmq_config.slave_cert,
                ]
            )
        if env_dut_str:
            args.append("--env-dut=%s" % os.path.join(base_dir, "env.dut.yaml"))

        proc = subprocess.Popen(
            args, stdout=out_file, stderr=err_file, env=env, preexec_fn=os.setpgrp
        )
        return proc.pid
    except Exception as exc:  # pylint: disable=broad-except
        LOG.error("[%d] Unable to start: %s", job_id, args)
        # daemon must always continue running even if the job crashes
        if hasattr(exc, "child_traceback"):
            LOG.exception("[%d] %s", job_id, exc.child_traceback)
        else:
            LOG.exception("[%d] %s", job_id, exc)
        with open(err_file, "a") as errlog:
            errlog.write("%s\n%s\n" % (exc, traceback.format_exc()))
        # The process has not started
        # The END message will be sent the next time
        # check_job_status is run
        return None


#########
# Classes
#########


class Job:
    """Wrapper around a job process."""

    RUNNING, CANCELING, FINISHED = range(3)

    def __init__(self, row):
        self.job_id = row["id"]
        self.pid = row["pid"]
        self.status = row["status"]
        self.prefix = row["prefix"]
        self.last_update = row["last_update"]
        # Create the base directory
        self.base_dir = os.path.join(
            tmp_dir,
            "{prefix}{job_id}".format(prefix=self.prefix, job_id=str(self.job_id)),
        )
        mkdir(self.base_dir)

    def errors(self):
        with contextlib.suppress(OSError):
            return open(os.path.join(self.base_dir, "stderr"), "r").read()
        return ""

    def description(self):
        try:
            filename = os.path.join(self.base_dir, "description.yaml")
            data = open(filename, "r").read()
            return lzma.compress(b(data))
        except (OSError, lzma.LZMAError) as exc:
            LOG.error("[%d] Unable to read 'description.yaml'", self.job_id)
            LOG.exception(exc)
            return None

    def kill(self):
        # If the pid is 0, just skip because lava-run was not started
        if self.pid == 0:
            return
        os.kill(self.pid, signal.SIGKILL)

    def terminate(self):
        # If the pid is 0, just skip because lava-run was not started
        if self.pid == 0:
            return
        os.kill(self.pid, signal.SIGTERM)

    def is_running(self):
        with contextlib.suppress(OSError):
            with open("/proc/%d/cmdline" % self.pid, "r") as fd:
                return "lava-run" in fd.read()
        return False

    def send_end(self, sock):
        errors = self.errors()
        description = self.description()
        if description is None:
            LOG.error("[%d] lava-run crashed", self.job_id)
            if errors:
                LOG.error(errors)
            description = ""
        LOG.debug("END(%d) => master", self.job_id)
        send_multipart_u(sock, ["END", str(self.job_id), errors, description])

    def send_start_ok(self, sock):
        LOG.debug("START_OK(%d) => master", self.job_id)
        send_multipart_u(sock, ["START_OK", str(self.job_id)])


class JobsDB:
    def __init__(self, dbname):
        self.conn = sqlite3.connect(dbname)
        self.conn.row_factory = sqlite3.Row
        self.conn.execute(
            "CREATE TABLE IF NOT EXISTS jobs(id INTEGER PRIMARY KEY, pid INTEGER, status INTEGER, last_update INTEGER, prefix VARCHAR(100) DEFAULT '')"
        )
        self.conn.commit()
        # Migrate the schema only if needed
        sql = self.conn.execute("SELECT sql FROM sqlite_master").fetchone()["sql"]
        if "prefix" not in sql:
            self.conn.execute(
                "ALTER TABLE jobs ADD COLUMN prefix VARCHAR(100) DEFAULT ''"
            )
            self.conn.commit()

    def create(self, job_id, pid, status, dispatcher_cfg):
        """
        When pid is 0, the pid is unknown
        """
        # Keep the prefix (if present) in the database to later delete
        # resources
        prefix = get_prefix(dispatcher_cfg)

        with contextlib.suppress(sqlite3.Error):
            self.conn.execute(
                "INSERT INTO jobs VALUES(?, ?, ?, ?, ?)",
                (str(job_id), str(pid), str(status), str(int(time.time())), prefix),
            )
            self.conn.commit()
            return self.get(job_id)
        return None

    def get(self, job_id):
        row = self.conn.execute(
            "SELECT * FROM jobs WHERE id=?", (str(job_id),)
        ).fetchone()
        return None if row is None else Job(row)

    def update(self, job_id, status):
        with contextlib.suppress(sqlite3.Error):
            self.conn.execute(
                "UPDATE jobs SET status=?, last_update=? WHERE id=?",
                (str(status), str(int(time.time())), str(job_id)),
            )
            self.conn.commit()
            return self.get(job_id)
        return None

    def delete(self, job_id):
        with contextlib.suppress(sqlite3.Error):
            self.conn.execute("DELETE FROM jobs WHERE id=?", (str(job_id),))
            self.conn.commit()

    def all_ids(self):
        jobs = self.conn.execute("SELECT * FROM jobs")
        return [job["id"] for job in jobs]

    def running(self):
        jobs = self.conn.execute(
            "SELECT * FROM jobs WHERE status=?", (str(Job.RUNNING),)
        )
        for job in jobs:
            yield Job(job)

    def canceling(self):
        jobs = self.conn.execute(
            "SELECT * FROM jobs WHERE status=?", (str(Job.CANCELING),)
        )
        for job in jobs:
            yield Job(job)

    def finished(self):
        jobs = self.conn.execute(
            "SELECT * FROM jobs WHERE status=?", (str(Job.FINISHED),)
        )
        for job in jobs:
            yield Job(job)


class Master:
    """
    Keep track of the master state
    """

    def __init__(self):
        self.last_msg = 0
        self.last_ping = 0
        self.online = False
        # Start with TIMEOUT. Master will send the right value to use
        # afterward.
        self.ping_interval = TIMEOUT

    def ping(self, sock):
        """
        PING the master whenever needed
        Send a PING only if we haven't received a message from the master nor sent
        a PING for a long time.

        :param sock: the zmq socket
        """
        now = time.time()
        if now - max(self.last_msg, self.last_ping) > self.ping_interval:
            # Is the master offline ?
            if self.online and now - self.last_msg > 4 * self.ping_interval:
                LOG.warning("Master goes OFFLINE")
                self.online = False

            LOG.debug("PING => master (last message %ss ago)", int(now - self.last_msg))

            send_multipart_u(sock, ["PING"])
            self.last_ping = now

    def received_msg(self):
        """
        We received a valid message from the master
        """
        self.last_msg = time.time()
        if not self.online:
            LOG.info("Master is ONLINE")
        self.online = True


#########
# Control
#########


def create_context(options):
    """
    Create the ZMQ context and necessary accessories.

    :param options: the command line options
    :return A tuple with: the zmq context, the zmq socket, the zmq poller, a
    read pipe and a write pipe.
    """
    # Connect to the master dispatcher.
    context = zmq.Context()
    sock = context.socket(zmq.ROUTER)
    sock.setsockopt(zmq.IDENTITY, b(options.hostname))
    # Limit the number of messages in the queue
    sock.setsockopt(zmq.SNDHWM, SEND_QUEUE)
    # From http://api.zeromq.org/4-2:zmq-setsockopt#toc5
    # "Immediately readies that connection for data transfer with the master"
    sock.setsockopt(zmq.CONNECT_RID, b"master")

    if options.socks_proxy:
        LOG.info("[INIT] Using a socks proxy")
        sock.setsockopt(zmq.SOCKS_PROXY, b(options.socks_proxy))

    if options.ipv6:
        LOG.info("[INIT] Enabling IPv6")
        sock.setsockopt(zmq.IPV6, 1)

    # If needed, load certificates
    if not options.encrypt:
        LOG.debug("[INIT] Connection is not encrypted")
    else:
        LOG.info("[INIT] Starting encryption")
        LOG.debug("[INIT] Opening slave cert: %s", options.slave_cert)
        (client_public, client_private) = zmq.auth.load_certificate(options.slave_cert)
        sock.curve_publickey = client_public
        sock.curve_secretkey = client_private
        LOG.debug("[INIT] Opening master cert: %s", options.master_cert)
        (server_public, _) = zmq.auth.load_certificate(options.master_cert)
        sock.curve_serverkey = server_public

    sock.connect(options.master)

    # Poll on the socket and the pipe (signal).
    poller = zmq.Poller()
    poller.register(sock, zmq.POLLIN)

    # Mask signals and create a pipe that will receive a bit for each signal
    # received. Poll the pipe along with the zmq socket so that we can only be
    # interrupted while reading data.
    (pipe_r, pipe_w) = os.pipe()
    flags = fcntl.fcntl(pipe_w, fcntl.F_GETFL, 0)
    fcntl.fcntl(pipe_w, fcntl.F_SETFL, flags | os.O_NONBLOCK)

    def signal_to_pipe(signum, _):
        # Send the signal number on the pipe
        os.write(pipe_w, b(chr(signum)))

    signal.signal(signal.SIGHUP, signal_to_pipe)
    signal.signal(signal.SIGINT, signal_to_pipe)
    signal.signal(signal.SIGTERM, signal_to_pipe)
    signal.signal(signal.SIGQUIT, signal_to_pipe)
    poller.register(pipe_r, zmq.POLLIN)

    return context, sock, poller, pipe_r, pipe_w


def recv_from_master(prefix, poller, pipe_r, sock):
    """
    Receive some data from the master

    :return a tuple with (leaving, message)
    """
    try:
        sockets = dict(poller.poll(TIMEOUT * 1000))
    except zmq.error.ZMQError as exc:
        LOG.error(prefix + "A zmq error was raised: %s", str(exc))
        return (False, None)

    if sockets.get(pipe_r) == zmq.POLLIN:
        LOG.info(prefix + "Received a signal, leaving")
        return (True, None)

    elif sockets.get(sock) == zmq.POLLIN:
        msg = sock.recv_multipart()

        try:
            # Check master identity
            master_id = u(msg.pop(0))
            if master_id != "master":
                LOG.error(
                    prefix + "Invalid master id '%s'. Should be 'master'", master_id
                )
                return (False, None)

            return (False, msg)
        except (IndexError, TypeError):
            LOG.error(prefix + "Invalid message from master: %s", msg)
            return (False, None)
    else:
        return (False, None)


def check_job_status(jobs, sock, last_jobs_check):
    """Look for finished jobs

    :param jobs: the list of jobs
    :param sock: the zmq socket
    :param last_jobs_check: the last time the job where checked
    """
    now = time.time()
    if now - last_jobs_check < JOBS_CHECK_INTERVAL:
        return last_jobs_check

    # Re-send the END message (if needed)
    for job in jobs.finished():
        if now - job.last_update > JOBS_CHECK_INTERVAL:
            # Re-send the END message (the server hasn't answered yet)
            LOG.info("[%d] Job END (resend)", job.job_id)
            job.send_end(sock)

    # Check all running jobs
    for job in jobs.running():
        if not job.is_running():
            # wait for the job
            try:
                os.waitpid(job.pid, 0)
            except OSError as exc:
                LOG.debug(
                    "[%d] unable to wait for the process: %s", job.job_id, str(exc)
                )
            LOG.info("[%d] Job END", job.job_id)
            job.send_end(sock)
            jobs.update(job.job_id, Job.FINISHED)

    # Check canceling jobs
    for job in jobs.canceling():
        if not job.is_running():
            # wait for the job
            try:
                os.waitpid(job.pid, 0)
            except OSError as exc:
                LOG.debug(
                    "[%d] unable to wait for the process: %s", job.job_id, str(exc)
                )
            LOG.info("[%d] Job END", job.job_id)
            job.send_end(sock)
        elif time.time() - job.last_update > FINISH_MAX_DURATION:
            LOG.info("[%d] Job not finishing => killing", job.job_id)
            job.kill()
        elif time.time() - job.last_update > FINISH_MAX_DURATION / 2:
            LOG.info("[%d] Job not finishing => second signal", job.job_id)
            job.terminate()

    return time.time()


def verify_socket(sock, master_url, ipv6=False):
    identity = sock.getsockopt(zmq.IDENTITY).decode("utf-8")
    master = urlparse(master_url)
    LOG.debug(
        "[BTSP] Checking master [%s:%s] to create socket for %s",
        master.hostname,
        master.port,
        identity,
    )
    try:
        # getaddrinfo handles ipv4 and ipv6 but
        # returns a varying list of tuples which need extra parsing.
        socket.getaddrinfo(master.hostname, master.port)
        if not ipv6:
            # for IPv4, get useful log info with gethostbyname
            LOG.debug(
                "[BTSP] socket IPv4 address: %s", socket.gethostbyname(master.hostname)
            )
    except socket.gaierror as exc:
        LOG.error("[BTSP] Socket timeout error: %s", exc)
        return False
    except socket.timeout as exc:
        LOG.error("[BTSP] Socket DNS error: %s", exc)
        return False
    except OSError as exc:
        LOG.error("[BTSP] Socket error: %s", exc)
        return False
    except Exception as exc:
        LOG.error("[BTSP] Socket error 2: %s", exc)
        return False
    return True


def connect_to_master(poller, pipe_r, sock, master, ipv6):
    LOG.info("[BTSP] Greeting the master [%s] => 'HELLO'", master)
    send_multipart_u(sock, ["HELLO", str(PROTOCOL_VERSION), __version__])
    (leaving, msg) = recv_from_master("[BTSP] ", poller, pipe_r, sock)

    while not leaving:
        # Parse the message
        try:
            message = u(msg[0])
            if message == "HELLO_OK":
                LOG.info("[BTSP] Connection with master [%s] established", master)
                return True
            else:
                LOG.info("[BTSP] Unexpected message from master: %s", message)
        except (IndexError, TypeError):
            if msg is not None:
                LOG.error("[BTSP] Invalid message from master: %s", msg)
        if verify_socket(sock, master, ipv6):
            LOG.info(
                "[BTSP] Greeting master => 'HELLO_RETRY' (using the same version?)"
            )
            send_multipart_u(sock, ["HELLO_RETRY", str(PROTOCOL_VERSION), __version__])
        (leaving, msg) = recv_from_master("[BTSP] ", poller, pipe_r, sock)

    return False


def destroy_context(context, sock, read_pipe, write_pipe):
    """
    Clean up function to close ZMQ and related objects.

    :param context: The zmq context to terminate.
    :param sock: The zmq socket to close.
    :param read_pipe: The read pipe to close.
    :param write_pipe: The write pipe to close.
    """
    LOG.info("[EXIT] Closing sock and pipes, dropping messages")
    with contextlib.suppress(OSError):
        os.close(read_pipe)
        os.close(write_pipe)
    sock.close(linger=0)
    context.term()


def handle(msg, master, jobs, zmq_config, sock):
    """
    Handle the master message

    :param msg: the master message (the header was removed)
    """
    # 1: identity and action
    try:
        action = u(msg[0])
    except (IndexError, TypeError):
        LOG.error("Invalid message from master: %s", msg)
        return

    # 2: handle the action
    if action == "CANCEL":
        handle_cancel(msg, jobs, sock, master)
    elif action == "END_OK":
        handle_end_ok(msg, jobs)
    elif action == "HELLO_OK":
        handle_hello_ok()
    elif action == "PONG":
        handle_pong(msg, master)
    elif action == "START":
        handle_start(msg, jobs, sock, master, zmq_config)
    elif action == "STATUS":
        handle_status(msg, jobs, sock, master)
    else:
        # Do not tag the master as alive as the message does not mean
        # anything.
        LOG.error("Unknown action: '%s', args=(%s)", action, msg[1:])


def handle_cancel(msg, jobs, sock, master):
    """
    Parse the cancel message

    Check if the job is known and started and cancel the process. In any cases,
    notify the master that the job is ending.
    """
    try:
        job_id = int(msg[1])
    except (IndexError, ValueError):
        LOG.error("Invalid message '%s'", msg)
        return
    LOG.info("master => CANCEL(%d)", job_id)

    job = jobs.get(job_id)
    if job is not None:
        if job.status == Job.RUNNING:
            if job.is_running():
                # Do not send the END message now. We don't know if the process
                # will leave right now.
                LOG.debug("[%d] Canceling", job_id)
                job.terminate()
                jobs.update(job_id, Job.CANCELING)
            else:
                LOG.info("[%d] Job already finished", job.job_id)
                job.send_end(sock)
    else:
        LOG.debug("[%d] Unknown job", job_id)
        job = jobs.create(job_id, 0, Job.FINISHED, "")
        job.send_end(sock)

    # Mark the master as alive
    master.received_msg()


def handle_end_ok(msg, jobs):
    """ Handle END_OK by removing the job from the table """
    try:
        job_id = int(msg[1])
    except (IndexError, ValueError):
        LOG.error("Invalid message '%s'", msg)
        return
    LOG.info("master => END_OK(%d)", job_id)

    job = jobs.get(job_id)
    if job is None:
        LOG.debug("[%d] Unknown job", job_id)

    # Remove stale resources
    prefix = "" if job is None else job.prefix
    for directory in STALE_CONFIG:
        if "pattern" in STALE_CONFIG[directory]:
            dir_name = STALE_CONFIG[directory]["pattern"].format(
                prefix=prefix, job_id=job_id
            )
            dir_path = os.path.join(directory, dir_name)
            if not os.path.exists(dir_path):
                continue
            LOG.debug("[%d] Removing %s", job_id, dir_path)
            with contextlib.suppress(OSError):
                os.remove(dir_path)
            shutil.rmtree(dir_path, ignore_errors=True)
        elif "glob" in STALE_CONFIG[directory]:
            pattern_str = STALE_CONFIG[directory]["glob"].format(
                prefix=prefix, job_id=job_id
            )
            pattern = os.path.join(directory, pattern_str)
            removed = False
            for path in glob.iglob(pattern):
                LOG.debug("[%d] Removing %s", job_id, path)
                with contextlib.suppress(OSError):
                    os.remove(path)
                shutil.rmtree(path, ignore_errors=True)
            if not removed:
                continue
        else:
            continue
        action = STALE_CONFIG[directory].get("post-action")
        if action is not None:
            try:
                LOG.debug("---> calling '%s'", " ".join(action))
                subprocess.check_call(action)
            except (OSError, subprocess.CalledProcessError) as exc:
                LOG.error("----> unable to run action: %s", str(exc))

    # Remove from the jobs
    jobs.delete(job_id)

    # Do not mark the master as alive. In fact we are not sending
    # back any data so the master will not be able to mark the
    # slave as alive.


def handle_hello_ok():
    """ Handle HELLO_OK messages (nothing to do) """
    LOG.debug("master => HELLO_OK")


def handle_pong(msg, master):
    """ handle PONG messages by marking the master as alive """
    try:
        ping_interval = int(msg[1])
    except (IndexError, ValueError):
        LOG.error("Invalid message '%s'", msg)
        return

    if ping_interval < TIMEOUT:
        LOG.error("invalid ping interval (%d) too small", ping_interval)
        return

    LOG.debug("master => PONG(%d)", ping_interval)
    master.received_msg()
    master.ping_interval = ping_interval


def handle_start(msg, jobs, sock, master, zmq_config):
    """
    Start jobs when requested by the master.

    If the job was already started or finished, then send the corresponding
    message back to the master.
    """
    try:
        job_id = int(msg[1])
        (job_definition, device_definition, dispatcher_config, env, env_dut) = (
            u(m) for m in msg[2:]
        )
    except (IndexError, ValueError) as exc:
        LOG.error("Invalid message '%s'. length=%d. %s", msg, len(msg), exc)
        return
    LOG.info("master => START(%d)", job_id)

    # Check if the job is known and started. In this case, send
    # back the right signal (ignoring the duplication or signaling
    # the end of the job).
    job = jobs.get(job_id)
    if job is not None:
        if job.status == Job.FINISHED:
            LOG.warning("[%d] Job already finished", job_id)
            job.send_end(sock)
        else:
            LOG.debug("[%d] Job already running", job_id)
            job.send_start_ok(sock)
    else:
        # Pretty print configuration
        dispatcher_cfg = yaml_safe_load(dispatcher_config)
        dispatcher_str = str(dispatcher_cfg) if dispatcher_config else ""
        env_str = str(yaml_safe_load(env)) if env else ""
        env_dut_str = str(yaml_safe_load(env_dut)) if env_dut else ""

        LOG.info("[%d] Starting job", job_id)
        LOG.debug("[%d]         : %s", job_id, yaml_safe_load(job_definition))
        LOG.debug("[%d] device  : %s", job_id, device_definition)
        LOG.debug("[%d] dispatch: %s", job_id, dispatcher_str)
        LOG.debug("[%d] env     : %s", job_id, env_str)
        LOG.debug("[%d] env-dut : %s", job_id, env_dut_str)

        # Start the job, grab the pid and create it in the dabatase
        pid = start_job(
            job_id,
            job_definition,
            device_definition,
            zmq_config,
            dispatcher_config,
            env,
            env_dut,
        )
        job = jobs.create(
            job_id,
            0 if pid is None else pid,
            Job.FINISHED if pid is None else Job.RUNNING,
            dispatcher_cfg,
        )
        job.send_start_ok(sock)

    # Mark the master as alive
    master.received_msg()


def handle_status(msg, jobs, sock, master):
    """
    Parse STATUS messages

    Return START_OK or END depending on the job (running, unknown or ended).
    """
    try:
        job_id = int(msg[1])
    except (IndexError, ValueError):
        LOG.error("Invalid message '%s'", msg)
        return
    LOG.info("master => STATUS(%d)", job_id)

    job = jobs.get(job_id)
    if job is not None:
        if job.status == Job.FINISHED:
            # The job has already ended
            LOG.debug("[%d] job already finished", job_id)
            job.send_end(sock)
        else:
            # The job is still running
            LOG.debug("[%d] job is running", job_id)
            job.send_start_ok(sock)
    else:
        # Unknown job: return END anyway
        LOG.debug("[%d] Unknown job, sending END after STATUS", job_id)
        job = jobs.create(job_id, 0, Job.FINISHED, "")
        job.send_end(sock)

    # Mark the master as alive
    master.received_msg()


def setup_parser():
    parser = argparse.ArgumentParser(description="LAVA Slave")
    parser.add_argument(
        "--hostname", type=str, default=get_fqdn(), help="Name of the slave"
    )
    parser.add_argument(
        "--debug", action="store_true", default=False, help="Debug lava-run"
    )

    storage = parser.add_argument_group("storage")
    storage.add_argument(
        "--slave-dir", type=str, default=SLAVE_DIR, help="Path to slave data storage"
    )

    net = parser.add_argument_group("network")
    net.add_argument("--master", type=str, required=True, help="Main master socket")
    net.add_argument("--socket-addr", type=str, required=True, help="Log socket")
    net.add_argument(
        "--socks-proxy", type=str, default=None, help="Connect using a socks proxy"
    )
    net.add_argument("--ipv6", default=False, action="store_true", help="Enable IPv6")

    enc = parser.add_argument_group("encryption")
    enc.add_argument(
        "--encrypt", default=False, action="store_true", help="Encrypt messages"
    )
    enc.add_argument(
        "--master-cert",
        type=str,
        default="/etc/lava-dispatcher/certificates.d/master.key",
        help="Master certificate file",
    )
    enc.add_argument(
        "--slave-cert",
        type=str,
        default="/etc/lava-dispatcher/certificates.d/slave.key_secret",
        help="Slave certificate file",
    )

    log = parser.add_argument_group("logging")
    log.add_argument(
        "--log-file",
        type=str,
        help="Log file for the slave logs",
        default="/var/log/lava-dispatcher/lava-slave.log",
    )
    log.add_argument(
        "--level",
        "-l",
        type=str,
        default="INFO",
        choices=["DEBUG", "ERROR", "INFO", "WARN"],
        help="Log level, default to INFO",
    )

    return parser


def setup_logger(log_file, level):
    """
    Configure the logger

    :param log_file: the log_file or "-" for sys.stdout
    :param level: the log level
    """
    # Configure the log handler
    if log_file == "-":
        handler = logging.StreamHandler(sys.stdout)
    else:
        handler = logging.handlers.WatchedFileHandler(log_file)
    handler.setFormatter(logging.Formatter(FORMAT))
    LOG.addHandler(handler)

    # Set-up the LOG level
    if level == "ERROR":
        LOG.setLevel(logging.ERROR)
    elif level == "WARN":
        LOG.setLevel(logging.WARN)
    elif level == "INFO":
        LOG.setLevel(logging.INFO)
    else:
        LOG.setLevel(logging.DEBUG)


def main():
    # Parse command line
    options = setup_parser().parse_args()

    # Setup logger
    setup_logger(options.log_file, options.level)
    LOG.info("[INIT] LAVA slave has started.")
    LOG.info("[INIT] Version %s", __version__)
    LOG.info("[INIT] Using protocol version %d", PROTOCOL_VERSION)

    global debug
    debug = options.debug

    # Check the hostname because "lava-logs" is reserved
    if options.hostname == "lava-logs":
        LOG.error("[INIT] 'lava-logs' is a reserved hostname")
        return 1

    try:
        ctx, sock, poller, pipe_r, pipe_w = create_context(options)
    except Exception as exc:
        LOG.error("[INIT] %s", str(exc))
        LOG.exception(exc)
        return 1

    # slave states
    master = Master()

    mkdir(options.slave_dir)
    if options.slave_dir != SLAVE_DIR:
        global tmp_dir
        tmp_dir = os.path.join(options.slave_dir, "tmp")

    # TODO: make this configurable
    jobs = JobsDB(os.path.join(options.slave_dir, "db.sqlite3"))
    last_jobs_check = time.time()
    if options.encrypt:
        zmq_config = ZMQConfig(
            options.socket_addr,
            options.master_cert,
            options.slave_cert,
            options.socks_proxy,
            options.ipv6,
        )
    else:
        zmq_config = ZMQConfig(
            options.socket_addr, None, None, options.socks_proxy, options.ipv6
        )

    # Main loop
    try:
        LOG.info(
            "[BTSP] Connecting to master [%s] as <%s>", options.master, options.hostname
        )
        if not connect_to_master(poller, pipe_r, sock, options.master, options.ipv6):
            return 1
        master.received_msg()

        (leaving, msg) = recv_from_master("", poller, pipe_r, sock)
        while not leaving:
            # If the message is not empty, handle it
            if msg is not None:
                handle(msg, master, jobs, zmq_config, sock)
            # Ping the master if needed
            master.ping(sock)
            # Regular checks
            last_jobs_check = check_job_status(jobs, sock, last_jobs_check)
            # Listen to the master
            (leaving, msg) = recv_from_master("", poller, pipe_r, sock)

    except Exception as exc:
        LOG.error("[EXIT] %s", str(exc))
        LOG.exception(exc)
        return 1
    finally:
        LOG.info("[EXIT] destroying the context")
        destroy_context(ctx, sock, pipe_r, pipe_w)

    return 0


if __name__ == "__main__":
    sys.exit(main())
